In [1]:
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

KubeFlow Pipeline local development quickstart

In this notebook, we will demo:

  • Author components with the lightweight method and ContainerOp based on existing images.
  • Author pipelines.

Note: Make sure that you have docker installed in the local environment

Setup


In [2]:
# PROJECT_ID is used to construct the docker image registry. We will use Google Container Registry, 
# but any other accessible registry works as well. 
PROJECT_ID='Your-Gcp-Project-Id'

In [ ]:
# Install Pipeline SDK
!pip3 install kfp --upgrade
!mkdir -p tmp/pipelines

Part 1

Two ways to author a component to list blobs in a GCS bucket

A pipeline is composed of one or more components. In this section, you will build a single component that lists the blobs in a GCS bucket. Then you build a pipeline that consists of this component. There are two ways to author a component. In the following sections we will go through each of them.

1. Create a lightweight python component from a Python function.

1.1 Define component function

The requirements for the component function:

  • The function must be stand-alone.
  • The function can only import packages that are available in the base image.
  • If the function operates on numbers, the parameters must have type hints. Supported types are int, float, bool. Everything else is passed as str, that is, string.
  • To build a component with multiple output values, use Python’s typing.NamedTuple type hint syntax.

In [4]:
def list_blobs(bucket_name: str) -> str:
  '''Lists all the blobs in the bucket.'''
  import subprocess

  subprocess.call(['pip', 'install', '--upgrade', 'google-cloud-storage'])
  from google.cloud import storage
  storage_client = storage.Client()
  bucket = storage_client.get_bucket(bucket_name)
  list_blobs_response = bucket.list_blobs()
  blobs = ','.join([blob.name for blob in list_blobs_response])
  print(blobs)
  return blobs

1.2 Create a lightweight Python component


In [5]:
import kfp.components as comp

# Converts the function to a lightweight Python component.
list_blobs_op = comp.func_to_container_op(list_blobs)

1.3 Define pipeline

Note that when accessing google cloud file system, you need to make sure the pipeline can authenticate to GCP. Refer to Authenticating Pipelines to GCP for details.


In [7]:
import kfp.dsl as dsl

# Defines the pipeline.
@dsl.pipeline(name='List GCS blobs', description='Lists GCS blobs.')
def pipeline_func(bucket_name):
  list_blobs_task = list_blobs_op(bucket_name)
  # Use the following commented code instead if you want to use GSA key for authentication.
  #
  # from kfp.gcp import use_gcp_secret
  # list_blobs_task = list_blobs_op(bucket_name).apply(use_gcp_secret('user-gcp-sa'))
  # Same for below.
# Compile the pipeline to a file.
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, 'tmp/pipelines/list_blobs.pipeline.tar.gz')

2. Wrap an existing Docker container image using ContainerOp

2.1 Create a Docker container

Create your own container image that includes your program. If your component creates some outputs to be fed as inputs to the downstream components, each separate output must be written as a string to a separate local text file inside the container image. For example, if a trainer component needs to output the trained model path, it can write the path to a local file /output.txt. The string written to an output file cannot be too big. If it is too big (>> 100 kB), it is recommended to save the output to an external persistent storage and pass the storage path to the next component.

Start by entering the value of your Google Cloud Platform Project ID.

The following cell creates a file app.py that contains a Python script. The script takes a GCS bucket name as an input argument, gets the lists of blobs in that bucket, prints the list of blobs and also writes them to an output file.


In [8]:
%%bash

# Create folders if they don't exist.
mkdir -p tmp/components/list-gcs-blobs

# Create the Python file that lists GCS blobs.
cat > ./tmp/components/list-gcs-blobs/app.py <<HERE
import argparse
from google.cloud import storage
# Parse agruments.
parser = argparse.ArgumentParser()
parser.add_argument(
    '--bucket', type=str, required=True, help='GCS bucket name.')
args = parser.parse_args()
# Create a client.
storage_client = storage.Client()
# List blobs.
bucket = storage_client.get_bucket(args.bucket)
list_blobs_response = bucket.list_blobs()
blobs = ','.join([blob.name for blob in list_blobs_response])
print(blobs)
with open('/blobs.txt', 'w') as f:
  f.write(blobs)
HERE

Now create a container that runs the script. Start by creating a Dockerfile. A Dockerfile contains the instructions to assemble a Docker image. The FROM statement specifies the Base Image from which you are building. WORKDIR sets the working directory. When you assemble the Docker image, COPY will copy the required files and directories (for example, app.py) to the filesystem of the container. RUN will execute a command (for example, install the dependencies) and commits the results.


In [9]:
%%bash

# Create Dockerfile.
cat > ./tmp/components/list-gcs-blobs/Dockerfile <<EOF
FROM python:3.6-slim
WORKDIR /app
COPY . /app
RUN pip install --upgrade google-cloud-storage
EOF

Now that we have created our Dockerfile we can create our Docker image. Then we need to push the image to a registry to host the image. Now create a Shell script that builds a container image and stores it in the Google Container Registry.


In [10]:
%%bash -s "{PROJECT_ID}"

IMAGE_NAME="listgcsblobs"
TAG="latest" # "v_$(date +%Y%m%d_%H%M%S)"

# Create script to build docker image and push it.
cat > ./tmp/components/list-gcs-blobs/build_image.sh <<HERE
PROJECT_ID="${1}"
IMAGE_NAME="${IMAGE_NAME}"
TAG="${TAG}"
GCR_IMAGE="gcr.io/\${PROJECT_ID}/\${IMAGE_NAME}:\${TAG}"
docker build -t \${IMAGE_NAME} .
docker tag \${IMAGE_NAME} \${GCR_IMAGE}
docker push \${GCR_IMAGE}
docker image rm \${IMAGE_NAME}
docker image rm \${GCR_IMAGE}
HERE

Run the script.


In [ ]:
%%bash

# Build and push the image.
cd tmp/components/list-gcs-blobs
bash build_image.sh

2.2 Define each component

Define a component by creating an instance of kfp.dsl.ContainerOp that describes the interactions with the Docker container image created in the previous step. You need to specify the component name, the image to use, the command to run after the container starts, the input arguments, and the file outputs. .


In [12]:
import kfp.dsl

def list_gcs_blobs_op(name, bucket):
  return kfp.dsl.ContainerOp(
      name=name,
      image='gcr.io/{}/listgcsblobs:latest'.format(PROJECT_ID),
      command=['python', '/app/app.py'],
      file_outputs={'blobs': '/blobs.txt'},
      arguments=['--bucket', bucket]
  )

2.3 Create your workflow as a Python function

Start by creating a folder to store the pipeline file.


In [13]:
# Create folders if they don't exist.
!mkdir -p tmp/pipelines

Define your pipeline as a Python function. @kfp.dsl.pipeline is a required decoration including name and description properties. Then compile the pipeline function. After the compilation is completed, a pipeline file is created.


In [14]:
import datetime
import kfp.compiler as compiler

# Define the pipeline
@kfp.dsl.pipeline(
  name='List GCS Blobs',
  description='Takes a GCS bucket name as input and lists the blobs.'
)
def pipeline_func(bucket='Enter your bucket name here.'):
  list_blobs_task = list_gcs_blobs_op('List', bucket)

# Compile the pipeline to a file.
filename = 'tmp/pipelines/list_blobs{dt:%Y%m%d_%H%M%S}.pipeline.tar.gz'.format(
    dt=datetime.datetime.now())
compiler.Compiler().compile(pipeline_func, filename)

Follow the instructions on kubeflow.org to access Kubeflow UIs. Upload the created pipeline and run it.

Warning: When the pipeline is run, it pulls the image from the repository to the Kubernetes cluster to create a container. Kubernetes caches pulled images. One solution is to use the image digest instead of the tag in your component dsl, for example, s/v1/sha256:9509182e27dcba6d6903fccf444dc6188709cc094a018d5dd4211573597485c9/g. Alternatively, if you don't want to update the digest every time, you can try :latest tag, which will force the k8s to always pull the latest image..


Part 2

Create a pipeline using Kubeflow Pipelines

In this section, you will build another component. Then you will see how to connect components to build a multi-component pipeline. You will build the new component by building a Docker container image and wrapping it using ContainerOp.

1 Create a container to view CSV

Build a component that can the output of the first component explained in the preceding section (that is, the list of GCS blobs), selects a file ending in iris.csv and displays its content as an artifact. Start by uploading to your Storage bucket the quickstart_iris.csv file that is included in the repository.


In [ ]:
%%bash -s "{PROJECT_ID}"
# Create folders if they don't exist.
mkdir -p tmp/components/view-input


# Create the Python file that selects and views the input CSV.
cat > ./tmp/components/view-input/app.py <<HERE
import argparse
import json
from google.cloud import storage
# Parse agruments.
parser = argparse.ArgumentParser()
parser.add_argument('--blobs', type=str, required=True, help='List of blobs.')
args = parser.parse_args()
blobs = args.blobs.split(',')
inputs = filter(lambda s: s.endswith('iris.csv'), blobs)
input = list(inputs)[0]
print('The CSV file is {}'.format(input))
# CSV header.
header = [
    'sepal_length',
    'sepal_width',
    'petal_length',
    'petal_width',
    'species',
]
# Add a metadata for an artifact.
metadata = {
  'outputs' : [{
    'type': 'table',
    'storage': 'gcs',
    'format': 'csv',
    'header': header,
    'source': input
  }]
}
print(metadata)
# Create an artifact.
with open('/mlpipeline-ui-metadata.json', 'w') as f:
  json.dump(metadata, f)
HERE


# Create Dockerfile.
cat > ./tmp/components/view-input/Dockerfile <<HERE
FROM python:3.6-slim
WORKDIR /app
COPY . /app
RUN pip install --upgrade google-cloud-storage
HERE


# Create script to build docker image and push it.
IMAGE_NAME="viewinput"
TAG="latest" # "v_$(date +%Y%m%d_%H%M%S)"
cat > ./tmp/components/view-input/build_image.sh <<HERE
PROJECT_ID="${1}"
IMAGE_NAME="${IMAGE_NAME}"
TAG="${TAG}"
GCR_IMAGE="gcr.io/\${PROJECT_ID}/\${IMAGE_NAME}:\${TAG}"
docker build -t \${IMAGE_NAME} .
docker tag \${IMAGE_NAME} \${GCR_IMAGE}
docker push \${GCR_IMAGE}
docker image rm \${IMAGE_NAME}
docker image rm \${GCR_IMAGE}
HERE


# Build and push the image.
cd tmp/components/view-input
bash build_image.sh

2 Define each component

Define each of your components by using kfp.dsl.ContainerOp. Decribe the interactions with the Docker container image created in the previous step by specifying the component name, the image to use, the command to run after the container starts, the input arguments, and the file outputs.


In [16]:
import kfp.dsl

def list_gcs_blobs_op(name, bucket):
  return kfp.dsl.ContainerOp(
      name=name,
      image='gcr.io/{}/listgcsblobs:latest'.format(PROJECT_ID),
      command=['python', '/app/app.py'],
      arguments=['--bucket', bucket],
      file_outputs={'blobs': '/blobs.txt'},
      output_artifact_paths={'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'},
  )

def view_input_op(name, blobs):
  return kfp.dsl.ContainerOp(
      name=name,
      image='gcr.io/{}/viewinput:latest'.format(PROJECT_ID),
      command=['python', '/app/app.py'],
      arguments=['--blobs', blobs]
  )

3 Create your workflow as a Python function

Define your pipeline as a Python function. @kfp.dsl.pipeline is a required decoration including name and description properties. pipeline_func defines the pipeline with the bucket parameter. When the user uploads the pipeline to the system and starts creating a new run from it, they'll see the an input box for the bucket parameter with the initial value Enter your bucket name here.. You can change the initial value with your bucket name at runtime. list_gcs_blobs_op('List', bucket) will create a component named List that lists the blobs. view_input_op('View', list_blobs_task.outputs['blobs']) will create a component named View that views a CSV. list_blobs_task.outputs['blobs'] tells the pipeline to take the output of the first component stored as string in blobs.txt as an input for the second component.


In [17]:
# Create folders if they don't exist.
!mkdir -p tmp/pipelines

In [18]:
import datetime
import kfp.compiler as compiler

# Define the pipeline
@kfp.dsl.pipeline(
  name='Quickstart pipeline',
  description='Takes a GCS bucket name views a CSV input file in the bucket.'
)
def pipeline_func(bucket='Enter your bucket name here.'):
  list_blobs_task = list_gcs_blobs_op('List', bucket)
  view_input_task = view_input_op('View', list_blobs_task.outputs['blobs'])

# Compile the pipeline to a file.
filename = 'tmp/pipelines/quickstart_pipeline{dt:%Y%m%d_%H%M%S}.pipeline.tar.gz'.format(
    dt=datetime.datetime.now())
compiler.Compiler().compile(pipeline_func, filename)

Follow the instructions on kubeflow.org to access Kubeflow UIs. Upload the created pipeline and run it.

Clean up


In [19]:
import shutil
import pathlib
path = pathlib.Path("tmp")
shutil.rmtree(path)